package kafka

import (
	"context"
	"fmt"
	"github.com/Shopify/sarama"
	"log"
	"sync"
)

func NewKafkaConsumerGroup(brokers []string, topics []string, group string, businessCall func(message *sarama.ConsumerMessage) bool) *ConsumerGroup {
	k := &ConsumerGroup{
		brokers:           brokers,
		topics:            topics,
		group:             group,
		channelBufferSize: 2,
		ready:             make(chan bool),
		version:           "1.1.1",
		handler:           businessCall,
	}
	k.Init()
	return k
}

// ConsumerGroup 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组,
// 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个
// Consumer 消费,但可以被多个 consumer group 消费
type ConsumerGroup struct {
	//代理(broker): 一台kafka服务器称之为一个broker
	brokers []string
	//主题(topic): 消息的一种逻辑分组，用于对消息分门别类，每一类消息称之为一个主题，相同主题的消息放在一个队列中
	topics            []string
	version           string
	ready             chan bool
	group             string
	channelBufferSize int
	//业务调用
	handler func(message *sarama.ConsumerMessage) bool
}

func (k *ConsumerGroup) Init() func() {

	version, err := sarama.ParseKafkaVersion(k.version)
	if err != nil {
		fmt.Printf("Error parsing Kafka version: %v", err)
	}
	cfg := sarama.NewConfig()
	cfg.Version = version
	// 分区分配策略
	cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	// 未找到组消费位移的时候从哪边开始消费
	cfg.Consumer.Offsets.Initial = -2
	// channel长度
	cfg.ChannelBufferSize = k.channelBufferSize
	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup(k.brokers, k.group, cfg)
	if err != nil {
		fmt.Printf("Error creating consumer group client: %v", err)
	}

	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer func() {
			wg.Done()
			//util.HandlePanic("client.Consume panic", log.StandardLogger())
		}()
		for {
			if err := client.Consume(ctx, k.topics, k); err != nil {
				log.Printf("Error from consumer: %v", err)
			}
			// check if context was cancelled, signaling that the consumer should stop
			if ctx.Err() != nil {
				log.Println(ctx.Err())
				return
			}
			k.ready = make(chan bool)
		}
	}()

	<-k.ready
	fmt.Printf("Sarama consumer up and running!... \n")
	// 保证在系统退出时，通道里面的消息被消费
	return func() {
		cancel()
		wg.Wait()
		if err = client.Close(); err != nil {
			fmt.Printf("Error closing client: %v  \n", err)
		}
	}

}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error {
	// Mark the consumer as ready
	close(k.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
	// 具体消费消息
	for message := range claim.Messages() {
		//msg := string(message.Value)
		//k.logger.Infof("卡夫卡: %s", msg)

		if ok := k.handler(message); ok {
			// 更新位移
			session.MarkMessage(message, "")
		}
		//run.Run(msg)
	}
	return nil
}
